跳到主要内容

RocketMQ 的消费方式

1. 基础概念类面试题

Q1: RocketMQ 提供了几种消费方式?请详细说明它们的区别和适用场景。

答案要点: RocketMQ 提供了两种消费方式:Push 模式和 Pull 模式。

Push 模式:

  • RocketMQ 主动将消息推送给消费者
  • 消费者注册并订阅主题或标签后,自动接收消息
  • 适用于实时性要求较高的场景,如消息通知、实时处理

Pull 模式:

  • 消费者主动从 RocketMQ 服务器拉取消息
  • 通过调用拉取消息的 API 从指定队列获取消息
  • 适用于消费者对消息拉取频率和控制要求较高的场景,如离线数据处理、批量处理
提示

RocketMQ 默认使用的是 Pull 模式消费。

Q2: 从底层实现角度来说,Push 模式是真正的推送吗?请解释其内部机制。

答案要点: Push 模式实际上是基于 Pull 模式的封装,并非真正的服务端推送。其内部机制如下:

说明:

  1. Consumer 启动时会创建后台线程
  2. 后台线程不断向 Broker 发送长轮询请求
  3. Broker 如果有消息立即返回,没有消息则等待一段时间
  4. Consumer 收到消息后立即处理,然后发起下一次拉取
  5. 这样实现了类似推送的效果,但本质上还是拉取

2. 实现细节类面试题

Q3: 请详细描述 RocketMQ 消费者启动到消费消息的完整流程。

时序图:

详细说明:

  1. 消费者初始化:创建 DefaultMQPushConsumer 实例,设置必要参数
  2. 订阅设置:通过 subscribe() 方法设置要消费的 Topic 和 Tag
  3. 启动注册:消费者向 NameServer 注册,获取 Broker 路由信息
  4. 负载均衡:根据消费组和队列数量进行负载均衡分配
  5. 消息拉取:PullMessageService 不断从分配的队列拉取消息
  6. 消息消费:ConsumeMessageService 将消息提交给业务逻辑处理
  7. 结果确认:根据消费结果发送 ACK 或重试

Q4: RocketMQ 的负载均衡策略有哪些?请画图说明平均分配策略的工作原理。

答案要点: RocketMQ 提供了以下几种负载均衡策略:

  1. AllocateMessageQueueAveragely:平均分配(默认)
  2. AllocateMessageQueueAveragelyByCircle:环形平均分配
  3. AllocateMessageQueueByConfig:根据配置分配
  4. AllocateMessageQueueByMachineRoom:按机房分配
  5. AllocateMessageQueueConsistentHash:一致性哈希分配

平均分配策略示例:

分配算法:

  • Queue总数 ÷ Consumer总数 = 每个Consumer基础分配数
  • Queue总数 % Consumer总数 = 额外需要分配的Queue数
  • 前面的Consumer多分配一个Queue

3. 高级特性类面试题

Q5: 什么是顺序消息?RocketMQ 如何保证消息的顺序性?

答案要点: 顺序消息是指消息按照发送顺序被消费的消息类型,分为全局顺序和局部顺序。

顺序消息消费流程:

保证顺序性的关键点:

  1. 发送端:使用 MessageQueueSelector 将相关消息发送到同一个 Queue
  2. 存储端:单个 Queue 内消息严格按照到达顺序存储
  3. 消费端:单线程消费,对 Queue 加锁避免并发消费

Q6: RocketMQ 的消息过滤机制有哪些?请详细说明 Tag 过滤和 SQL92 过滤的实现原理。

答案要点:

1. Tag 过滤(Broker端过滤):

2. SQL92 过滤(Broker端过滤):

过滤机制对比:

过滤方式过滤位置性能灵活性适用场景
Tag过滤Broker端简单分类
SQL92过滤Broker端复杂条件
类过滤Consumer端最高自定义逻辑

4. 性能优化类面试题

Q7: 在高并发场景下,如何优化 RocketMQ 消费者的性能?

答案要点:

1. 消费者配置优化:

// Go SDK 示例配置
consumer := rocketmq.NewPushConsumer(
consumer.WithGroupName("high_performance_group"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithConsumeMessageBatchMaxSize(32), // 批量消费
consumer.WithMaxReconsumeTimes(3), // 重试次数
consumer.WithConsumeTimeout(15*time.Minute), // 消费超时
consumer.WithConsumerThreads(20), // 消费线程数
)

2. 性能优化策略:

Q8: 消息积压问题如何排查和解决?请提供完整的解决方案。

排查和解决流程:

具体解决方案:

  1. 临时方案(快速止损):

    • 快速扩容 Consumer 实例
    • 提高消费并发度
    • 临时跳过非关键消息
  2. 长期方案(根本解决):

    • 优化消费逻辑,提高处理效率
    • 实施消息分级,重要消息优先处理
    • 建立监控告警机制

5. 故障处理类面试题

Q9: RocketMQ 消费失败后的重试机制是怎样的?如何设计合理的重试策略?

重试机制流程:

重试延迟时间级别:

  • 1st: 10s
  • 2nd: 30s
  • 3rd: 1min
  • 4th: 2min
  • 5th: 3min
  • ...
  • 16th: 2h

Q10: 在分布式环境下,如何保证 RocketMQ 消费的幂等性?

幂等性保证方案:

实现方案:

  1. 基于 Redis 的幂等性实现:
func (c *OrderConsumer) ConsumeMessage(msgs []*primitive.MessageExt) error {
for _, msg := range msgs {
msgID := msg.MsgId

// 检查是否已处理
exists, err := c.redis.Exists(msgID).Result()
if err != nil {
return err
}
if exists {
continue // 已处理,跳过
}

// 设置处理标记(带过期时间)
err = c.redis.SetNX(msgID, "processing", 30*time.Minute).Err()
if err != nil {
return err
}

// 处理业务逻辑
err = c.processBusiness(msg)
if err != nil {
c.redis.Del(msgID) // 处理失败,删除标记
return err
}

// 更新为已完成状态
c.redis.Set(msgID, "completed", 24*time.Hour)
}
return nil
}

6. 架构设计类面试题

Q11: 如果让你设计一个基于 RocketMQ 的订单系统,如何保证数据一致性?

分布式事务方案设计: